# Copyright (C) 2012 Anaconda, Inc
# SPDX-License-Identifier: BSD-3-Clause
"""Helpler functions for subprocess."""

from __future__ import annotations

import os
import sys
from collections import namedtuple
from logging import getLogger
from os.path import abspath
from subprocess import PIPE, CalledProcessError, Popen
from typing import TYPE_CHECKING

from .. import ACTIVE_SUBPROCESSES
from ..auxlib.compat import shlex_split_unicode
from ..auxlib.ish import dals
from ..base.context import context
from ..common.compat import encode_environment, isiterable
from ..common.constants import TRACE
from ..gateways.disk.delete import rm_rf
from ..utils import wrap_subprocess_call

if TYPE_CHECKING:
    from collections.abc import Sequence
    from pathlib import Path

log = getLogger(__name__)
Response = namedtuple("Response", ("stdout", "stderr", "rc"))


def _format_output(command_str, cwd, rc, stdout, stderr):
    return dals(
        """
    $ %s
    ==> cwd: %s <==
    ==> exit code: %d <==
    ==> stdout <==
    %s
    ==> stderr <==
    %s
    """
    ) % (command_str, cwd, rc, stdout, stderr)


def any_subprocess(args, prefix, env=None, cwd=None):
    script_caller, command_args = wrap_subprocess_call(
        context.root_prefix,
        prefix,
        context.dev,
        context.debug,
        args,
    )
    process = Popen(
        command_args,
        cwd=cwd or prefix,
        universal_newlines=False,
        stdout=PIPE,
        stderr=PIPE,
        env=env,
    )
    stdout, stderr = process.communicate()
    if script_caller is not None:
        if "CONDA_TEST_SAVE_TEMPS" not in os.environ:
            rm_rf(script_caller)
        else:
            log.warning(
                f"CONDA_TEST_SAVE_TEMPS :: retaining pip run_script {script_caller}"
            )
    if hasattr(stdout, "decode"):
        stdout = stdout.decode("utf-8", errors="replace")
    if hasattr(stderr, "decode"):
        stderr = stderr.decode("utf-8", errors="replace")
    return stdout, stderr, process.returncode


def subprocess_call(
    command: str | os.PathLike | Path | Sequence[str | os.PathLike | Path],
    env: dict[str, str] | None = None,
    path: str | os.PathLike | Path | None = None,
    stdin: str | None = None,
    raise_on_error: bool = True,
    capture_output: bool = True,
):
    """This utility function should be preferred for all conda subprocessing.
    It handles multiple tricky details.
    """
    env = encode_environment(env or os.environ)
    cwd = sys.prefix if path is None else abspath(path)
    if not isiterable(command):
        command = shlex_split_unicode(command)
    try:
        command_str = os.fspath(command)
    except TypeError:
        # TypeError: command is not a str or PathLike
        command_str = " ".join(map(os.fspath, command))
    log.debug("executing>> %s", command_str)

    pipe = None
    if capture_output:
        pipe = PIPE
    elif stdin:
        raise ValueError("When passing stdin, output needs to be captured")
    else:
        stdin = None

    # spawn subprocess
    process = Popen(
        command,
        cwd=cwd,
        stdin=pipe,
        stdout=pipe,
        stderr=pipe,
        env=env,
        text=True,  # open streams in text mode so that we don't have to decode
        errors="replace",
    )
    ACTIVE_SUBPROCESSES.add(process)

    # decode output, if not PIPE, stdout/stderr will be None
    stdout, stderr = process.communicate(input=stdin)
    rc = process.returncode
    ACTIVE_SUBPROCESSES.remove(process)

    if (raise_on_error and rc != 0) or log.isEnabledFor(TRACE):
        formatted_output = _format_output(command_str, cwd, rc, stdout, stderr)
    if raise_on_error and rc != 0:
        log.info(formatted_output)
        raise CalledProcessError(rc, command, output=formatted_output)
    if log.isEnabledFor(TRACE):
        log.log(TRACE, formatted_output)

    return Response(stdout, stderr, int(rc))


def _subprocess_clean_env(env, clean_python=True, clean_conda=True):
    dels = []
    if clean_python:
        dels.extend(("PYTHONPATH", "PYTHONHOME"))
    if clean_conda:
        dels.extend(
            ("CONDA_ROOT", "CONDA_PROMPT_MODIFIER", "CONDA_EXE", "CONDA_DEFAULT_ENV")
        )
    for key in dels:
        if key in env:
            del env[key]


def subprocess_call_with_clean_env(
    command,
    path=None,
    stdin=None,
    raise_on_error=True,
    clean_python=True,
    clean_conda=True,
):
    # Any of these env vars are likely to mess the whole thing up.
    # This has been seen to be the case with PYTHONPATH.
    env = os.environ.copy()
    _subprocess_clean_env(env, clean_python, clean_conda)
    # env['CONDA_DLL_SEARCH_MODIFICATION_ENABLE'] = '1'
    return subprocess_call(
        command, env=env, path=path, stdin=stdin, raise_on_error=raise_on_error
    )
